package chagine.core.util;

import chagine.core.DefaultConstants;
import chagine.core.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * redis轮询列队
 */
@Slf4j
public abstract class RedisQueue<T> implements DisposableBean {

    protected final RedisTemplate<String, T> redisTemplate;
    protected final String keyName;
    protected final String title;
    protected final int threadNum;
    // 空载计数
    private final AtomicInteger logCount = new AtomicInteger();
    // 运行停止
    protected boolean runStop = false;

    protected RedisQueue(RedisTemplate<String, T> redisTemplate, String keyName, String title) {
        this(redisTemplate, keyName, title, null);
    }

    protected RedisQueue(RedisTemplate<String, T> redisTemplate, String keyName, String title, Integer threadNum) {
        this.redisTemplate = redisTemplate;
        this.keyName = keyName;
        this.title = title;
        if (threadNum == null) {
            threadNum = 1;
        }
        if (threadNum < 1) {
            threadNum = 1;
        }
        this.threadNum = threadNum;
    }

    @PostConstruct
    public void init() {
        for (int i = 1; i <= threadNum; i++) {
            createThread(i);
        }
    }

    private void createThread(int threadNum) {
        new Thread(() -> {
            log.info("【{}】运行Redis列队:[{}]，线程[{}]", this.title, this.keyName, threadNum);
            ListOperations<String, T> operations = this.redisTemplate.opsForList();
            while (true) {
                if (runStop) {
                    log.info("【{}】Redis列队，服务停止，线程[{}]", this.title, threadNum);
                    break;
                }
                if (!DefaultConstants.SYSTEM_IS_RUN) {
                    // 系统未开始进入运行状态，跳过步骤
                    log.info("【{}】系统未进入运行状态，线程[{}]", this.title, threadNum);
                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException ignore) {
                    }
                    continue;
                }
                T task = null;
                try {
                    task = operations.rightPop(this.keyName, 10, TimeUnit.SECONDS);
                } catch (Exception e) {
                    log.error("【{}】线程[{}]列队获取异常>>>>>", this.title, threadNum, e);
                }
                if (task != null) {
                    try {
                        processingTask(task);
                        successTask(task);
                        successCallbackTask(task);
                    } catch (Exception e) {
                        log.error("【{}】线程[{}]列队运行异常 >>>>>", this.title, threadNum, e);
                        failTask(task, e);
                        failCallbackTask(task, e);
                    }
                    // 重新计数
                    logCount.set(0);
                } else {
                    int num = logCount.addAndGet(1);
                    if (num % 60 == 0) {
                        log.info("【{}】线程[{}]列队，无数据", this.title, threadNum);
                        // 重新计数
                        logCount.set(0);
                    }
                }
            }
        }).start();
    }

    /**
     * 添加发送任务
     */
    public void pushTask(T task) {
        Assert.error(task == null, ErrorCode.BUSINESS_EXCEPTION, this.title + "，提交任务为空");
        ListOperations<String, T> operations = this.redisTemplate.opsForList();
        operations.leftPush(this.keyName, task);
    }

    /**
     * 批量添加任务
     */
    public void pushTask(List<T> tasks) {
        Assert.error(tasks == null || tasks.isEmpty(), ErrorCode.BUSINESS_EXCEPTION, this.title + "，批量提交任务为空");
        for (T task : tasks) {
            Assert.error(task == null, ErrorCode.BUSINESS_EXCEPTION, this.title + "，批量提交的任务存在空值");
        }
        ListOperations<String, T> operations = this.redisTemplate.opsForList();
        operations.leftPushAll(this.keyName, tasks);
    }

    /**
     * 处理任务
     */
    protected abstract void processingTask(T task);

    /**
     * 处理成功任务
     */
    protected abstract void successTask(T task);

    /**
     * 成功响应回调
     */
    protected void successCallbackTask(T task) {
        // 回调方法不一定要实现
    }

    /**
     * 处理失败任务
     */
    protected abstract void failTask(T task, Throwable throwable);

    /**
     * 失败响应回调
     */
    protected void failCallbackTask(T task, Throwable throwable) {
        // 回调方法不一定要实现
    }


    @Override
    public void destroy() throws Exception {
        runStop = true;
    }
}
